package rabbitmq

import (
	"encoding/json"
	"fmt"
	"github.com/Shopify/sarama"
	"github.com/qit-team/snow-core/redis"
	"snow-im/app/utils"
	"snow-im/config"
	"strconv"
)

type KafkaMq struct {

}



//同步
func (n KafkaMq) Produce(name string, log interface{}, delayTime int, args ...interface{}) error {
	kafConfig := sarama.NewConfig()
	kafConfig.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
	kafConfig.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
	kafConfig.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

	// 构造一个消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = name
	var data,_ = json.Marshal(log)
	msg.Value = sarama.StringEncoder(string(data))
	// 连接kafka
	client, err := sarama.NewSyncProducer(config.GetConf().KafkaUrl, kafConfig)
	if err != nil {
		fmt.Println("producer closed, err:", err)
		return nil
	}
	defer client.Close()
	// 发送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		utils.Log(nil,"send msg failed, err:", err,pid,offset)
		return nil
	}
	fmt.Println(pid,offset)
	return  nil
}

func (n KafkaMq) Consume(name string, hand interface{})  {
	consumer, err := sarama.NewConsumer(config.GetConf().KafkaUrl, nil)
	if err != nil {
		utils.Log(nil,"kafka comsume", err)
		return
	}
	partitionList, err := consumer.Partitions(name) // 根据topic取到所有的分区
	if err != nil {
		utils.Log(nil,"kafka comsume", err)
		return
	}
	utils.Log(nil,"kafka comsume",partitionList)
	for partition := range partitionList { // 遍历所有的分区
		// 针对每个分区创建一个对应的分区消费者
		var offsetReDis ,_ = redis.GetRedis().Get("kafka_consume:"+strconv.Itoa(int(partition)))
		offset := sarama.OffsetNewest
		offset = 0
		if offsetReDis != ""{
			offsetData,_ := strconv.Atoi(offsetReDis)
			offset = int64(offsetData)
		}
		utils.Log(nil,"**",offset,partition)
		pc, err := consumer.ConsumePartition(name, int32(partition),offset )
		if err != nil {
			fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
			return
		}
		defer pc.AsyncClose()
		// 异步从每个分区消费信息
		go func(sarama.PartitionConsumer) {
			for msg := range pc.Messages() {
				//fmt.Println("ddde")
				utils.Log(nil,string(msg.Value),msg.Offset)
				//fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
				//var handler = hand.(func(tag uint64, ch *amqp.Channel, msg []byte))
				//handler(0,nil,msg.Value)
				//redis记录
				redis.GetRedis().Set("kafka_consume:"+strconv.Itoa(int(msg.Partition)),msg.Offset)
			}
		}(pc)
		select {

		}
	}
}
